# coding: UTF-8

import pandas as pd
from pyspark import SparkContext, SparkConf
from pyspark.sql import HiveContext
from pyspark.sql import SQLContext
import sys

reload(sys)
import xlrd
import os

sys.setdefaultencoding('utf-8')
conf = SparkConf().set("spark.sql.parquet.compression.codec", "snappy")
sc = SparkContext("yarn-client", "test", conf=conf)
sqlContext = HiveContext(sc)
sqlContest = SQLContext(sc)

def extractions_tmp_2(keyword,provide_id, nums,dir):
    sql1="""select * from games.english_edu where msg_content like '%%%(keyword)s%%' and
        and prov_id in( 18 ,22,3) """ % {"keyword":keyword,"provide_id": provide_id}
    print sql1
    df=sqlContext.sql(sql1)
    df.registerTempTable("aaaaa")
    sql="""
        select
            t2.msisdn msisdn,
            t2.city city,
            t2.provider_id provider_id
        from (
            select
                t.msisdn msisdn ,
                t.city city,
                t.provider_id provider_id
            from
                aaaaa  t
            LEFT OUTER JOIN
                (SELECT  msisdn FROM history.english_history ) t1
            ON t.msisdn = t1.msisdn
            WHERE t1.msisdn is null
        ) t2
        group by
            t2.msisdn,
            t2.city,
            t2.provider_id
        limit %(nums)d
    """ % {"nums": nums}
    print sql
    df = sqlContext.sql(sql)
    writer = pd.ExcelWriter("""/home/games_data/result/20180829/%(dir)s.xlsx""" % {"dir":dir},
                            engine='xlsxwriter')
    df.toPandas().to_excel(writer, sheet_name='Sheet1')
    writer.save()
    print """%(keyword)s OK""" %{"keyword":keyword}


def eachFile(dir,pt):
    read_file_df = pd.read_excel("""/home/games_data/result/20180829/%(dir)s.xlsx""" %{"dir":dir})
    sqlContest = SQLContext(sc)
    spark_df = sqlContest.createDataFrame(read_file_df)
    spark_df.registerTempTable("tmp_table" )
    df = sqlContext.sql("""select
                cast(msisdn as string) msisdn
            from
                tmp_table""")
    df.repartition(10).write.mode("append").orc("/user/hive/warehouse/history.db/english_history/pt=%s" % pt)
    sqlContext.sql("alter table history.games_zcx drop if exists partition (pt=%s)"% pt)
    sqlContext.sql("alter table history.games_zcx add if not exists partition (pt=%s)"% pt)

extractions_tmp_2("VIPKID", 1,400000,"VIPKID")
eachFile("VIPKID",'20180829')

